-module (flatula_table).

-export ([ start_link/1 ]).

-behaviour (gen_server).
-export ([ init/1,
	   handle_call/3,
	   handle_cast/2,
	   handle_info/2,
	   terminate/2,
	   code_change/3 ]).

-include_lib ("kernel/include/file.hrl").

-record (state, { base_path,
		  max_total_bytes,
		  max_file_bytes,
		  cur_file,
		  cur_fd,
		  cur_ofs,
		  lookup,
		  total_bytes }).

-define (HEADER_LEN, 8).

%
% public
%

start_link (Options) ->
  gen_server:start_link (?MODULE, Options, []).

%
% gen_server callbacks
%

init (Options) ->
  process_flag (trap_exit, true),
  BasePath = option (Options, base_path),
  MaxTotalBytes = option (Options, max_total_bytes),
  MaxFileBytes = option (Options, max_file_bytes),
  State = open (#state { base_path = BasePath,
			 max_total_bytes = MaxTotalBytes,
			 max_file_bytes = MaxFileBytes }),
  { ok, State }.

handle_call ({ read, Id }, _From, State) ->
  { Result, NewState } = read (State, Id),
  { reply, Result, NewState };
handle_call ({ write, Term }, _From, State) ->
  { Result, NewState } = write (State, Term),
  { reply, Result, NewState };
handle_call ({ info, size }, _From, State) ->
  { reply, { size, State#state.total_bytes }, State };
handle_call ({ info, max_file_bytes }, _From, State) ->
  { reply, { max_file_bytes, State#state.max_file_bytes }, State };
handle_call ({ info, max_total_bytes }, _From, State) ->
  { reply, { max_total_bytes, State#state.max_total_bytes }, State };
handle_call (close, _From, State) ->
  { stop, normal, ok, State }.

handle_cast (_Request, State) ->
  { noreply, State }.

handle_info (_Msg, State) ->
  { noreply, State }.

terminate (_Reason, State) ->
  close (State),
  ok.

code_change (_OldVsn, State, _Extra) ->
  process_flag (trap_exit, true),
  { ok, State }.

%
% private
%

read (State = #state { cur_file = Cur }, _Id = { F, _, _ }) when F > Cur ->
  { not_found, State };

read (State = #state { cur_file = Cur, cur_ofs = CurOfs },
      _Id = { Cur, Ofs, Len })
	when Ofs + Len > CurOfs ->
  { not_found, State };

read (State = #state { cur_file = Cur, cur_fd = Fd, cur_ofs = CurOfs },
      _Id = { Cur, Ofs, Len })
	when Ofs + Len =< CurOfs ->
  Result = read_term (Fd, Ofs, Len),
  { Result, State };

read (State = #state { lookup = Lookup }, _Id = { File, Ofs, Len }) ->
  case gb_trees:lookup (File, Lookup) of
    { value, { Fd, Size } } ->
      case Ofs + Len > Size of
	true ->
	  { not_found, State };
	false ->
	  Result = read_term (Fd, Ofs, Len),
	  { Result, State }
      end;
    none ->
      { not_found, State }
  end.

read_term (Fd, Ofs, Len) ->
  case file:pread (Fd, Ofs, ?HEADER_LEN + Len) of
    { ok, eof } ->
      not_found;
    { ok, Data } ->
      try
	Header = header (Len),
	<< Header:?HEADER_LEN/binary, Binary:Len/binary >> = Data,
	{ ok, binary_to_term (Binary) }
      catch
	_:E -> { error, E }
      end;
    Error ->
      Error
  end.

write (State = #state { max_file_bytes = Max,
			cur_file = Cur,
			cur_fd = CurFd,
			cur_ofs = CurOfs,
			lookup = Lookup }, Term) when CurOfs > Max ->
  file:close (CurFd),
  NewCurFd = open_read_or_die (State, Cur),
  Next = Cur + 1,
  NextFd = open_readwrite_or_die (State, Next),
  NewLookup = gb_trees:insert (Cur, { NewCurFd, CurOfs }, Lookup),
  NewState = State#state { cur_file = Next,
			   cur_fd = NextFd,
			   cur_ofs = 0,
			   lookup = NewLookup },
  write_metadata (NewState),
  write (NewState, Term);

write (State = #state { max_total_bytes = Max,
			total_bytes = Total,
			lookup = Lookup }, Term) when Total > Max ->
  { First, { Fd, Size }, NewLookup } = gb_trees:take_smallest (Lookup),
  file:close (Fd),
  NewState = State#state { total_bytes = Total - Size,
			   lookup = NewLookup },
  write_metadata (NewState),
  file:delete (path (State, First)),
  write (NewState, Term);

write (State = #state { cur_file = Cur,
			cur_fd = Fd,
			cur_ofs = Ofs,
			total_bytes = Total }, Term) ->
  B = term_to_binary (Term, [ { minor_version, 1 } ]),
  Len = size (B),
  case file:pwrite (Fd, Ofs, [ header (Len), B ]) of
    ok ->
      Id = { Cur, Ofs, Len },
      NewState = State#state { cur_ofs = Ofs + Len + ?HEADER_LEN,
			       total_bytes = Total + Len + ?HEADER_LEN },
      { Id, NewState };
    Error ->
      erlang:error (Error)
  end.

write_metadata (State = #state { cur_file = Cur, lookup = Lookup }) ->
  List = [ { F, Size } || { F, { _Fd, Size } } <- gb_trees:to_list (Lookup) ],
  MetaData = term_to_binary ({ ?MODULE, 1, Cur, List }),
  Path = meta_path (State),
  case file:write_file (Path, MetaData) of
    ok    -> ok;
    Error -> erlang:error ({ write, Path, Error })
  end.

open (State) ->
  ok = filelib:ensure_dir (meta_path (State)),
  { Cur, Lookup, Total } =
    try
      { ok, MetaData } = file:read_file (meta_path (State)),
      { ?MODULE, 1, OldCur, List } = binary_to_term (MetaData),
      Files = case file:read_file_info (path (State, OldCur)) of
		{ ok, #file_info { size = Size } } ->
		  [ { OldCur, Size } | List ];
		_ ->
		  List
	      end,
      open_files (State, Files, OldCur + 1, gb_trees:empty (), 0)
    catch
      _:_ -> { 0, gb_trees:empty (), 0 }
    end,
  CurFd = open_readwrite_or_die (State, Cur),
  NewState = State#state { cur_file = Cur,
			   cur_fd = CurFd,
			   cur_ofs = 0,
			   lookup = Lookup,
			   total_bytes = Total },
  write_metadata (NewState),
  NewState.

open_files (_State, [], MaxFile, Lookup, Total) ->
  { MaxFile, Lookup, Total };
open_files (State, [ { File, Size } | Files ], MaxFile, Lookup, Total) ->
  try
    Fd = open_read_or_die (State, File),
    NewLookup = gb_trees:insert (File, { Fd, Size }, Lookup),
    NewMaxFile = case File > MaxFile of
		   true  -> File;
		   false -> MaxFile
		 end,
    open_files (State, Files, NewMaxFile, NewLookup, Total + Size)
  catch
    _:_ ->
      open_files (State, Files, MaxFile, Lookup, Total)
  end.

close (#state { cur_fd = CurFd, lookup = Lookup }) ->
  file:close (CurFd),
  Fds = [ Fd || { _File, { Fd, _Size } } <- gb_trees:to_list (Lookup) ],
  lists:foreach (fun (Fd) ->
		   file:close (Fd)
		 end,
		 Fds).

open_read_or_die (State, File) ->
  open_or_die (State, File, [ read, raw, binary ]).

open_readwrite_or_die (State, File) ->
  open_or_die (State, File, [ read, write, raw, binary ]).

open_or_die (State, File, Options) ->
  Path = path (State, File),
  case file:open (Path, Options) of
    { ok, Fd } ->
      Fd;
    Error ->
      erlang:error ({ open, Path, Error })
  end.

header (Len) when Len < 16#100000000 ->
  << $E, $E, $E, $E, Len:32/big-unsigned-integer >>.

path (State, File) ->
  State#state.base_path ++ [ $. | integer_to_list (File) ].

meta_path (State) ->
  State#state.base_path ++ ".meta".

option (Options, Name) ->
  case lists:keysearch (Name, 1, Options) of
    { value, { Name, Value } } ->
      Value;
    _ ->
      erlang:error ({ missing_option, Name })
  end.
